package org.hope.lee.consumer.cluster;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;

public class ConsumerClusterMember2 {
	public static void main(String[] args) throws MQClientException {
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_cluster");
		consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
		// 批量消费,每次拉取10条
		consumer.setConsumeMessageBatchMaxSize(10);
		// 如果非第一次启动，那么按照上次消费的位置继续消费
		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
		// 订阅PushTopic下Tag为push的消息
		consumer.subscribe("cluster_timeout_test", "TagA || Tag B || Tage C");
		consumer.registerMessageListener(new MqMessageListener2());
		consumer.start();
		System.out.println("Consumer Started.");

	}
}

class MqMessageListener2 implements MessageListenerConcurrently{
	public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
		try {
			MessageExt msg = msgs.get(0);
			String msgBody = new String(msg.getBody(), "utf-8");
			System.out.println("msgBody:" + msgBody);
		} catch(Exception e) {
			e.printStackTrace();
			return ConsumeConcurrentlyStatus.RECONSUME_LATER;
		}
		return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
	}
	
}
